package com.menghao.rpc.netty;

import com.menghao.rpc.NamedThreadFactory;
import com.menghao.rpc.netty.in.TcpInboundHandler;
import com.menghao.rpc.netty.in.TcpMessageDecoder;
import com.menghao.rpc.netty.out.TcpMessageEncoder;
import com.menghao.rpc.provider.model.RpcResponse;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.List;

/**
 * <p>借助Netty实现TCP客户端.<br>
 *
 * @author MarvelCode.
 */
public class TcpClient {

    private static EventLoopGroup workerGroup;

    private Bootstrap bootstrap;

    private int connectTimeout;

    private int maxFrameLength;

    private int readIdle;

    private int writIdle;

    private List<TcpMessageHandler> messageHandlers;

    public TcpClient(int connectTimeout, int maxFrameLength, int readIdle, int writIdle, List<TcpMessageHandler> messageHandlers) {
        this.connectTimeout = connectTimeout;
        this.maxFrameLength = maxFrameLength;
        this.readIdle = readIdle;
        this.writIdle = writIdle;
        this.messageHandlers = messageHandlers;
    }

    static {
        // 当jvm关闭的时候执行addShutdownHook添加的钩子方法
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                workerGroup.shutdownGracefully();
            }
        });
    }

    public void initBootstrap() {
        bootstrap = new Bootstrap();
        workerGroup = new NioEventLoopGroup(0, new NamedThreadFactory("netty-server-io", true));
        int ct = connectTimeout > 0 ? this.connectTimeout : 5000;
        bootstrap.group(workerGroup)
                // 连接超时时间
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ct)
                // 是否使用Nagle的算法以尽可能发送大块数据
                .option(ChannelOption.TCP_NODELAY, true)
                // 是否启动心跳保活机制（长连接）
                .option(ChannelOption.SO_KEEPALIVE, true)
                // 是否允许一个地址重复绑定
                .option(ChannelOption.SO_REUSEADDR, true)
                // 基于内存池的缓冲区重用机制
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        // 客户端需要序列化 rpcRequest、反序列化 rpcResponse
                        ch.pipeline().addLast(new TcpMessageEncoder())
                                .addLast(new TcpMessageDecoder(maxFrameLength, RpcResponse.class))
                                .addLast(new IdleStateHandler(readIdle, writIdle, 0))
                                .addLast(new TcpInboundHandler(messageHandlers));

                    }
                });
    }

    public ChannelFuture connect(String ip, int port) throws InterruptedException {
        return bootstrap.connect(ip, port).sync();
    }
}
